package org.example.hive;

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.example.utils.JDBCUtils;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;

/**
 * @author lwc
 * @description: TODO
 * @date 2023/11/29 14:40
 */
@Slf4j
public class DWSSelectExample {


    public static void Test() {

    }

    public static void main(String[] args) throws SQLException {

        SparkConf sparkConf = new SparkConf().setAppName("Spark HBase Example").setMaster("local[*]");
//        String ip = sparkConf.get("spark.app.ip", "25.60.253.150");
//        String port = sparkConf.get("spark.app.port", "18000");
//        String db = sparkConf.get("spark.app.db", "sjzt");
//        String username = sparkConf.get("spark.app.username", "sjzt_dws_yypdtqznjk_w");
//        String password = sparkConf.get("spark.app.password", "YyPdtqznjk_2023#$");
//        String sql = sparkConf.get("spark.app.sql", "select * from dws_yypdtqznjk.dws_cst_adj_volt_dev_asset_df limit 10");
//        String type = sparkConf.get("spark.app.type", "gaussdb");
//        String driver = sparkConf.get("spark.app.driver", "org.postgresql.Driver");
        String ip = sparkConf.get("spark.app.ip", "127.0.0.1");
        String port = sparkConf.get("spark.app.port", "5432");
        String db = sparkConf.get("spark.app.db", "test");
        String username = sparkConf.get("spark.app.username", "root");
        String password = sparkConf.get("spark.app.password", "9XME3z94xs9nhCj");
        String sql = sparkConf.get("spark.app.sql", "select * from config limit 1");
        String type = sparkConf.get("spark.app.type", "postgresql");
//        String driver = sparkConf.get("spark.app.driver", "org.postgresql.Driver");
        // 创建 SparkSession

        SparkSession spark = SparkSession.builder()
                .appName("HBase to MySQL Sync")
                .config(sparkConf)
//                .config("spark.master", "local[*]")  // 使用本地模式，[*]表示使用所有可用的核心
                .enableHiveSupport()
                .getOrCreate();
        log.info("=================开始查询数据===================");
        for (String arg : args) {
            log.info("------" + arg);
        }
        String mysqlURL = "jdbc:" + type + "://" + ip + ":" + port + "/" + db;

        log.info("地址为： {}", mysqlURL);
        // 查询 Hive 表数据
        log.info("用户的ip为：{}", ip);
        log.info("端口：{}", port);
        log.info("数据库为：{}", db);
        log.info("用户为：{}", username);
        log.info("密码为：{}", password);
        log.info("sql：{}", sql);
        log.info("type为：{}", type);
//        log.info("driver:{}", driver);
        Connection connection = JDBCUtils.getConnection(mysqlURL, username, password);
        log.info("connection:{}", connection);
        ResultSet resultSet = JDBCUtils.executeQuery(connection, sql);
        ResultSetMetaData metaData = resultSet.getMetaData();
        int columnCount = metaData.getColumnCount();
        // 输出列名
        for (int i = 1; i <= columnCount; i++) {
            String columnName = metaData.getColumnName(i);
            System.out.println("Column " + i + ": " + columnName);
        }

        // 遍历ResultSet
        while (resultSet.next()) {
            // 遍历每一列
            for (int i = 1; i <= columnCount; i++) {
                String columnValue = resultSet.getString(i);
                System.out.println("Column " + i + " Value: " + columnValue);
            }
        }


        // 读取数据
        Dataset<Row> data = spark.read()
                .format("jdbc")
                .option("url", mysqlURL)
//                .option("dbtable", table)
//                .option("driver", driver)  // 设置 MySQL 的 driver-class-name
                .option("dbtable", "(" + sql + ") as test") // 使用子查询作为表别名
                .option("user", username)
                .option("password", password)
                .load();

        // 打印数据
        data.show();

        // 执行SQL查询
//        Dataset<Row> result = data.select("column1", "column2")
//                .filter("column1 > 100");
        Dataset<Row> limit = data.limit(1);
        // 打印查询结果
        limit.show();

        // 关闭 SparkSession
        spark.stop();
//        Test();
    }


}
